/*
 * MIT License
 *
 * Copyright (c) 2020 wen.gu <454727014@qq.com>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

 /***************************************************************************
 * Name: message_broker.cpp
 *
 * Purpose: message base broker implmentation
 *
 * Developer:
 *   wen.gu , 2020-06-18
 *
 * TODO:
 *
 ***************************************************************************/

 /******************************************************************************
 **    INCLUDES
 ******************************************************************************/
#include "icpp/com/message_broker_simple.h"

#include <set>
#define LOG_TAG "msgb"
#include "icpp/core/log.h"

namespace icpp
{
namespace com
{
/******************************************************************************
 **    MACROS
 ******************************************************************************/

/******************************************************************************
 **    VARIABLE DEFINITIONS
 ******************************************************************************/


/******************************************************************************
 **    inner FUNCTION DEFINITIONS
 ******************************************************************************/

static bool CheckEndpointExist(MessageBrokerSimple::EndpointArray& eps, Endpoint* ep)
{
    for (auto it: eps)
    {
        if ((it != nullptr) && ((it == ep) || (it->name() == it->name())))
        {
            return true;
        }
    }

    return false;
    
}

static EndpointRole BrokerRole2EndpointRole(MessageBrokerRole role)
{
    EndpointRole ret = EndpointRole::kUnknown;
    switch (role)
    {
    case MessageBrokerRole::kService: ret = EndpointRole::kService; break;
    case MessageBrokerRole::kClient:  ret = EndpointRole::kClient; break;    
    default:
        break;
    }

    return ret;
}

/******************************************************************************
 **    FUNCTION DEFINITIONS
 ******************************************************************************/
MessageBrokerSimple::MessageBrokerSimple(MessageBrokerRole role)
    :msg_broker_role_(role),
    next_endpoint_id_(INVALID_ENDPOINT_ID),
    next_session_id_(INVALID_SESSION_ID)
{
    /** todo something */
}

MessageBrokerSimple::~MessageBrokerSimple()
{
    /** todo something */
    uninitialize();
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::initialize(const UrlList& urls, const MessageBrokerInitializeParam& param) 
{
    if (urls.empty())
    {
        return IcppErrc::BadParameter;
    }

    if (is_initialized_)
    {
        return IcppErrc::InvalidStatus;
    }

    clearEndpoints();

    /** remove repeat url */
    std::set<std::string>s(urls.begin(), urls.end());
    UrlList edp_urls;
    edp_urls.assign(s.begin(), s.end());

    EndpointRole edp_role = BrokerRole2EndpointRole(this->role());

    for (size_t i = 0; i < edp_urls.size(); i++)
    {
        std::string& uri = edp_urls[i];
        if (uri.empty())
        {
            continue;
        }
        Endpoint* edp = endpoint_manager_.find(uri, edp_role);

        if (edp)
        {
            IcppErrc ret = addEndpoint(edp, param.connection_state_handler);

            if (IcppErrc::OK != ret)
            {
                /** todo refine me?? */
                LOGE("add endpoint failed(%s) for url(%s)\n", core::ErrorStr(ret), uri.c_str());
                return ret;
            }
        }
    }
    
    protocol_version_ = param.protocol_version;
    interface_version_ = param.interface_version;
    broker_name_ = param.broker_name;
    is_initialized_ = true;

    return IcppErrc::OK;
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::uninitialize() 
{
    if (!is_initialized_)
    {
        return IcppErrc::InvalidStatus;
    }

    stop();
    clearEndpoints();

    return IcppErrc::OK;
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::start()
{
    if (is_started_)
    {
        return IcppErrc::InvalidStatus;
    }

    for (size_t i = 0; i < endpoints_.size(); i++)
    {
        Endpoint* ep = endpoints_[i];

        if (ep)
        {
            IcppErrc errc = ep->start();

            if (errc != IcppErrc::OK)
            {/** todo, is need ignore error */
                return errc;
            }
        }
    }

    is_started_ = true;

    return IcppErrc::OK;
    
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::stop()
{
    if (is_started_ == false)
    {
        return IcppErrc::InvalidStatus;
    }

    for (size_t i = 0; i < endpoints_.size(); i++)
    {
        Endpoint* ep = endpoints_[i];

        if (ep)
        {
            IcppErrc errc = ep->stop();

            if (errc != IcppErrc::OK)
            {/** todo, is need ignore error */
                return errc;
            }
        }
    }

    is_started_ = false;

    return IcppErrc::OK;
}


MessageBrokerSimple::IcppErrc MessageBrokerSimple::addEndpoint(Endpoint* ep, MessageBrokerConnectionStateHandler connection_state_handler)
{
    if (ep == nullptr)
    {
        return IcppErrc::BadParameter;
    }

    if (CheckEndpointExist(endpoints_, ep))
    {
        return IcppErrc::AlreadyExisted;
    }

    ep->setEndpointIdAllocator([this]() -> EndpointId { return this->next_endpoint_id_++; });

    ep->setConnectionStateHandler([this, ep, connection_state_handler](EndpointId endpoint_id, bool is_connected)
    {
        //connection_state_handler
        if (is_connected)
        {
            this->addSourceEndpoint(endpoint_id, ep);
        }
        else
        {
            this->removeSourceEndpoint(endpoint_id);
        }

        if (connection_state_handler)
        {
            connection_state_handler(endpoint_id, is_connected);
        }

    }); /** todo, refine me?? */


    ep->setReceiveHandler([this](MessagePtr msg)
    {/**todo, is need add this workflow a thread to run?*/
        if ((msg == nullptr))
        {
            /** todo, need send error to source broker */
            LOGE("invalid message\n");
            return ;
        }

        if (this->checkMessageVersion(msg))
        {
            this->onMessageProcess(msg);
        } 
    });

    endpoints_.push_back(ep);

    return IcppErrc::OK;
}


void MessageBrokerSimple::clearEndpoints()
{
    for (size_t i = 0; i < endpoints_.size(); i++)
    {
        Endpoint* ep = endpoints_[i];
        if (ep)
        {
            delete ep;
        }
    }

    endpoints_.clear();
}

void MessageBrokerSimple::responseError(MessagePtr msg_ptr, IcppErrc err)
{
    /** todo, refineme, Is it necessary to rule out this possibility */
    if (msg_ptr->type() != MessageType::kRequestNoReturn)
    {
        return ;
    }

    msg_ptr->set_error_code(err);
    sendMessage(msg_ptr);    
}

void MessageBrokerSimple::onMessageProcess(MessagePtr msg_ptr)
{
    if (onReply(msg_ptr))
    { /** return true, mean that current message is a reply message and successfull processed, so just return */
        return ;
    }

    MessageMap::iterator it = messages_.find(msg_ptr->type());
    if (it != messages_.end())
    {
        MessageHandlerMap& mhm = it->second;
        MessageHandlerMap::iterator handler = mhm.find(msg_ptr->message_id());

        if (handler == mhm.end())
        {
            handler = mhm.find(INVALID_MESSAGE_ID); /** indicate trigger all message ids with the type */
        }


        if (handler != mhm.end())
        {
            if (handler->second)
            {
                handler->second(msg_ptr);
            }
            else
            {
                LOGE("the message handler is nullptr, with type: %d, and id: %d\n", msg_ptr->type(), msg_ptr->message_id());
                responseError(msg_ptr, IcppErrc::NotImplemented);
            }
        } 
        else
        {
            LOGE("not found message id(%d), with type: %d\n", msg_ptr->message_id(), msg_ptr->type());
            responseError(msg_ptr, IcppErrc::NotFound);           
        } 
    }
    else
    {
        LOGE("unsupported message type: %d, and the message id: %d\n", msg_ptr->type(), msg_ptr->message_id());
        responseError(msg_ptr, IcppErrc::UnsupportedType);
    }
}

bool MessageBrokerSimple::checkMessageVersion(MessagePtr msg_ptr)
{
    if (protocol_version() != msg_ptr->protocol_version())
    {
        LOGE("the protocol version mismatch, our ver(%d), requester ver(%d), broker_name(%s)\n",
            protocol_version(), msg_ptr->protocol_version(), broker_name().c_str());
        responseError(msg_ptr, IcppErrc::ProtocolVersionErr);
        return false;
    }

    if (interface_version() != msg_ptr->interface_version())
    {
        LOGE("the interface version mismatch, our ver(%d), requester ver(%d), broker_name(%s)\n",
            protocol_version(), msg_ptr->protocol_version(), broker_name().c_str());
        responseError(msg_ptr, IcppErrc::InterfaceVersionErr);
        return false;
    }    

    return true;   
}

void MessageBrokerSimple::addSourceEndpoint(EndpointId endpoint_id, Endpoint* ep)
{
    connected_endpoints_[endpoint_id] = ep;
}

void MessageBrokerSimple::removeSourceEndpoint(EndpointId endpoint_id)
{
    EndpointMap::iterator it = connected_endpoints_.find(endpoint_id);
    if (it != connected_endpoints_.end())
    {
        connected_endpoints_.erase(it);
    }
}

void MessageBrokerSimple::removeSourceEndpoint(Endpoint* ep)  /** remove all endpoint id map with ep */
{
    EndpointMap::iterator it = connected_endpoints_.begin();
    for (; it != connected_endpoints_.end(); ++it)
    {
        if (it->second == ep)
        {
            connected_endpoints_.erase(it);
        }
    }
}

void MessageBrokerSimple::clearSourceEndpoint()
{
    connected_endpoints_.clear();
}

Endpoint* MessageBrokerSimple::findEndpoint(EndpointId endpoint_id)
{
    EndpointMap::iterator it = connected_endpoints_.find(endpoint_id);
    if (it != connected_endpoints_.end())
    {
        return it->second;
    }    

    return nullptr;
}

void MessageBrokerSimple::addResponseHandler(MessagePtr msg_ptr, MessageType reply_type,  MessageHandler reply_handler)
{
    if (reply_handler)
    {
        replies_[reply_type][msg_ptr->session_id()] = reply_handler;
    }        

}

void MessageBrokerSimple::removeResponseHandler(MessageType type, MessageId message_id, SessionId session_id)
{
    ReplyMap::iterator it = replies_.find(type);
    if (it != replies_.end())
    {
        ReplyHandlerMap& rhm = it->second;
        ReplyHandlerMap::iterator rhit = rhm.find(session_id);

        if (rhit != rhm.end())
        {
            rhm.erase(rhit);
        }
    } 
}

bool MessageBrokerSimple::onReply(MessagePtr msg_ptr)
{
    ReplyMap::iterator it = replies_.find(msg_ptr->type());
    if (it != replies_.end())
    {
        ReplyHandlerMap& rhm = it->second;
        ReplyHandlerMap::iterator rhit = rhm.find(msg_ptr->session_id());

        if (rhit != rhm.end()) 
        {
            if (rhit->second)
            {
                rhit->second(msg_ptr);
                rhm.erase(rhit);
                return true;
            }
        }
    }     

    return false;
}

SessionId MessageBrokerSimple::getSessionId()
{
    next_session_id_++;
    if (next_session_id_ == INVALID_SESSION_ID)
    {
        next_session_id_ = 1; /**todo, or do  ++ again */
    }

    return next_session_id_;
}

MessageBrokerSimple::MessagePtr MessageBrokerSimple::newMessage(MessageType type, MessageId msg_id, PayloadPtr payload_ptr /*= nullptr*/)
{
    MessagePtr msg_ptr = std::make_shared<Message>(type); /** todo refine me? */

    if (msg_ptr)
    {
        msg_ptr->set_message_id(msg_id);
        msg_ptr->set_protocol_version(protocol_version());
        msg_ptr->set_interface_version(interface_version());
        msg_ptr->set_payload(payload_ptr);
    }

    return msg_ptr;
}

MessageBrokerSimple::SerializerPtr MessageBrokerSimple::newSerializer()
{
    return std::make_shared<Serializer>();
}

MessageBrokerSimple::DeserializerPtr MessageBrokerSimple::newDeserializer(PayloadPtr payload)
{
    return std::make_shared<Deserializer>(payload);
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::registerMessageHandler(MessageType type, MessageId message_id,  MessageHandler handler)
{
    messages_[type][message_id] = handler;

    return IcppErrc::OK;
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::unregisterMessageHandler(MessageType type, MessageId message_id)
{
    MessageMap::iterator it = messages_.find(type);
    if (it != messages_.end())
    {
        MessageHandlerMap& mhm = it->second;
        MessageHandlerMap::iterator handler = mhm.find(message_id);

        if (handler != mhm.end())
        {
            mhm.erase(handler);
            return IcppErrc::OK;
        }  
    }

    return IcppErrc::NotFound;
}

/** if register message handler with 'type', 'message id' and 'handler', 
 * then curent registered 'parent' handler will not be called when 'message id' is received.
 **/
/** register message handler for all message id with the type */
MessageBrokerSimple::IcppErrc MessageBrokerSimple::registerMessageHandler(MessageType type, MessageHandler handler)
{
    /** register with INVALID_MESSAGE_ID mean that for all message id of current message type(like: any )*/
    return registerMessageHandler(type, INVALID_MESSAGE_ID, handler);
}
/** unregister all message handler with the message type */
MessageBrokerSimple::IcppErrc MessageBrokerSimple::unregisterMessageHandler(MessageType type) 
{
    return unregisterMessageHandler(type, INVALID_MESSAGE_ID);
} 

/** register a message handler and send a subscribe message */
MessageBrokerSimple::IcppErrc MessageBrokerSimple::subscribe(MessageType msg_type, MessageType subscribe_type, MessageId message_id, MessageHandler handler) 
{
    registerMessageHandler(msg_type, message_id, handler);
    MessagePtr msg_ptr = newMessage(subscribe_type, message_id);
    if (msg_ptr == nullptr)
    {
        unregisterMessageHandler(msg_type, message_id);
        return IcppErrc::InsufficientResources;
    }

    IcppErrc ret = sendMessage(msg_ptr); /** todo is need care about ack?? */
    if (IcppErrc::OK != ret)
    {
        unregisterMessageHandler(msg_type, message_id);                       
    }
    
    return ret; 
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::unsubscribe(MessageType msg_type, MessageType unsubscribe_type, MessageId message_id)
{
    unregisterMessageHandler(unsubscribe_type, message_id);
    MessagePtr msg_ptr = newMessage(unsubscribe_type, message_id);

    return sendMessage(msg_ptr);
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::resubscribe(MessageType msg_type, MessageType subscribe_type)
{
    const MessageHandlerMap& subscribed_msgs = registered_message_handler_map(msg_type);
    for (auto & it: subscribed_msgs)
    {
        MessagePtr msg_ptr = newMessage(subscribe_type, it.first);
        sendMessage(msg_ptr);  /** todo , is need care about  return value */
    }

    return IcppErrc::OK; /** todo refine me?? */
}


MessageBrokerSimple::IcppErrc MessageBrokerSimple::sendMessage(MessagePtr msg_ptr)
{
    if (msg_ptr != nullptr)
    {
        return IcppErrc::BadParameter;
    }

    if (msg_ptr->session_id() == INVALID_SESSION_ID)
    {
        msg_ptr->set_session_id(getSessionId());
    }

    if (msg_ptr->client_id() == INVALID_CLIENT_ID)
    {
        for (size_t i = 0; i < endpoints_.size(); i++)
        {
            Endpoint* ep = endpoints_[i];
            if (ep)
            {
                IcppErrc errc = ep->send(msg_ptr);
                
                if (errc != IcppErrc::OK)
                {
                    LOGE("broker send message with endpoint(%s) failed:%s\n", ep->name().c_str(), ErrorStr(errc));
                }
            }
        }        
    }
    else
    {
        Endpoint* ep = findEndpoint(msg_ptr->client_id());

        if (ep)
        {
            IcppErrc ret = ep->send(msg_ptr);

                if (ret != IcppErrc::OK)
                {
                    LOGE("broker send message with endpoint(%s) failed:%s\n", ep->name().c_str(), ErrorStr(ret));

                    return ret;
                }            
        }
    }


    return IcppErrc::OK;    
}

MessageBrokerSimple::IcppErrc MessageBrokerSimple::sendAndReply(MessagePtr msg_ptr, MessageType reply_type,  MessageHandler  reply_handler)
{
    if (msg_ptr != nullptr)
    {
        return IcppErrc::BadParameter;
    }

    addResponseHandler(msg_ptr, reply_type, reply_handler);

    IcppErrc ret = sendMessage(msg_ptr);

    if (IcppErrc::OK != ret)
    {
        removeResponseHandler(reply_type, msg_ptr->message_id(), msg_ptr->session_id());
    }

    return ret;
}

const MessageBrokerSimple::MessageMap& MessageBrokerSimple::registered_message_map() 
{
    return  messages_;
}

const MessageBrokerSimple::MessageHandlerMap& MessageBrokerSimple::registered_message_handler_map(MessageType type)
{
    return messages_[type]; /** todo ,refine me?? */    
}

} /** namespace com */
} /** namespace icpp */
